package com.shujia.flink.table

import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo11CheckPoint {
  def main(args: Array[String]): Unit = {
    //创建flink 环境
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    bsEnv.setParallelism(1)

    // 每 1000ms 开始一次 checkpoint
    bsEnv.enableCheckpointing(1000)

    // 高级选项：
    // 设置模式为精确一次 (这是默认值)
    bsEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // 确认 checkpoints 之间的时间会进行 500 ms
    bsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    // Checkpoint 必须在一分钟内完成，否则就会被抛弃
    bsEnv.getCheckpointConfig.setCheckpointTimeout(60000)

    // 同一时间只允许一个 checkpoint 进行
    bsEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    //当作业取消时，保留作业的 checkpoint。注意，这种情况下，需要手动清除该作业保留的 checkpoint。
    bsEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    //将状态保存到hdfs的状态后端
    val stateBackend = new FsStateBackend("hdfs://master:9000/flink/checkpoint")

    //设置状态后端
    bsEnv.setStateBackend(stateBackend)


    //设置table 环境的一些参数
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner() //使用blikc计划器
      .inStreamingMode() //流模式
      .build()

    // 创建flink  table 环境
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)



    bsTableEnv.executeSql(
      """
        |
        |CREATE TABLE dianxin (
        |mdn STRING,
        |grid STRING,
        |city STRING,
        |county STRING,
        |tTime INT,
        |start_time STRING,
        |end_time STRING,
        |`date` STRING
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'dianxin1',
        | 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
        | 'properties.group.id' = 'testGroup',
        | 'format' = 'csv',
        | 'scan.startup.mode' = 'earliest-offset',
        | 'csv.ignore-parse-errors' = 'true'
        |)
        |
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |CREATE TABLE print_table(
        |city STRING,
        |num BIGINT
        |)
        |WITH ('connector' = 'print')
        |
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |insert into print_table
        |select
        |city,
        |count(distinct mdn) as num
        |from dianxin
        |group by city
        |
      """.stripMargin)

  }

}
